Audioデータをクラウドに送ってみました。 MQTT(basic ingest) + Amazon Kinesis Data Streams + Amazon Kinesis Data Firehose + S3 (バイナリ)
1 はじめに
CX事業本部の平内(SIN)です。
ここまで、エッジ側のAudioデータをクラウドへ送信する要領をいくつか試してみました。
上記のうち、最後だけは、バイナリ型式でデータを送ってみたのですが、実は、AWS IoT SDKのMQTTクライアントのPublishもpayloadは、bufferとなってます。
https://awslabs.github.io/aws-crt-python/api/mqtt.html?#awscrt.mqtt.Connection.publish
今回は、AudioデータをJSON化せず、バイナリ形式のまま送信する要領を確認してみました。
2 構成
構成は、前回とほぼ同じです。
デバイスからは、500msec単位で、Audioデータ(バイナリ形式)とタイムスタンプ(バイナリ形式)を結合したものをPayloadとしてMQTTで送信します。
Amazon Kinesis Data Streams、Amazon Kinesis Data Firehose及び、S3バケットでは、バイナリ形式のまま保存されます。
バイナリデータは、タイムスタンプとAudioのROWデータを結合して、1データが、8008byteです。
下記は、後で出てくる、basic ingestを使用せず、通常のTopicで送信したものをAWSのコンソールでsubscribeしている様子ですが、・・・当然文字化けしてます。
3 basic ingest
今回は、メッセージブローカーを通さずに、basic ingestで送信してみました。
Reducing messaging costs with basic ingest
subscribeする要件がなく、単純にルールエンジンで処理するだけであれば、メッセージブローカーを経由しなければ、その分のコスト削減が可能です。メッセージブローカーのコストは、他に比べて比較的高いので、データ量が多い場合は、是非検討したいところです。
basic ingestを使用するには、送信側でtopic名を($aws/rules/ルール名/元のTopic名)に変えるだけです。(ルールエンジンに側に変更は必要ありません)
作成しているルール名が、audio_transmission_ruleとなているので、変更は、下記のようになってます。
- 元のTopic名
topic/audio_transmission
- 変更後のTopic名
$aws/rules/audio_transmission_rule/topic/audio_transmission
ルールエンジンからAmazon Kinesis Data Streams、Amazon Kinesis Data Firehoseを経由して、最終的にS3に保存されている様子です。
4 MQTT(バイナリ)
MQTTで送信しているコードです。
publishのpayloadに、バイナリデータをそのまま指定してます。また、Topic名は、basic ingestを使用するため変更されています。
index.py
import pyaudio from producer import Producer import numpy as np DEVICE_INDEX = 0 CHANNELS = 2 SAMPLE_RATE = 32000 # サンプルレート FORMAT = pyaudio.paInt16 CHUNK = int(SAMPLE_RATE/2) # 500msごとに取得する # open stream p = pyaudio.PyAudio() stream = p.open(format = FORMAT, channels = CHANNELS, rate = SAMPLE_RATE, input = True, input_device_index = DEVICE_INDEX, frames_per_buffer = CHUNK) producer = Producer() try: print("start ...") while True: # 500ms分のデータ読み込み data = stream.read(CHUNK) # numpy配列に変換 data = np.frombuffer(data, dtype="int16") # チャンネル 2ch -> 1ch data = data[0::2] # サンプルレート 32000Hz -> 8000Hz data = data[0::4] # byteに戻す data = data.tobytes() producer.send(data) except: stream.stop_stream() stream.close() p.terminate()
producer.py
from mqtt import Mqtt import json from datetime import datetime import struct class Producer(): def __init__(self): # basic ingest self.__topic = "$aws/rules/audio_transmission_rule/topic/audio_transmission" root_ca = "./certs/RootCA.pem" cert = "./certs/xxxxxxxxxx-certificate.pem.crt" key = "./certs/xxxxxxxxxx-private.pem.key" endpoint = "xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" self.__mqtt = Mqtt(root_ca, key, cert, endpoint) def send(self, data): now = datetime.now() # 時刻とデータの結合 ts = now.timestamp() ts = struct.pack('<d', ts) transfer_data = ts + data try: self.__mqtt.publish(self.__topic, transfer_data) print("publish {}byte".format(len(transfer_data))) except Exception as e: print("Exception: {}", e.args)
5 その他
Amazon Kinesis Data Streams、Amazon Kinesis Data Firehoseや、S3に保存されている形式は、前回と変わりませんので、その他の部分のコードに変更はありません。
6 最後に
今回は、MQTTでバイナリデータを送信する要領を確認してみました。
エッジ側で発生したRAWデータを、そのままの形でクラウドに持ってくることは、要件によっては、有効かもしれません。 AWS IoT Analyticsなどでデータストアに一旦蓄積してから、改めて分析等に適した形に操作するというのも有りかも知れません。
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_5